#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# (c) B.Kerler 2018-2024 under GPLv3 license
# If you use my code, make sure you refer to my name
#
# !!!!! If you use this code in commercial products, your product is automatically
# GPLv3 and has to be open sourced under GPLv3 as well. !!!!!
import argparse
import hashlib
import time

import requests
import serial
import serial.tools.list_ports
from Exscript.protocols.telnetlib import Telnet

try:
    from edlclient.Tools.qc_diag import qcdiag
except ImportError as e:
    pass

import usb.core
from enum import Enum

from passlib.hash import md5_crypt

try:
    from edlclient.Tools.sierrakeygen import SierraKeygen
except ImportError:
    import os, sys, inspect

    current_dir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
    parent_dir = os.path.dirname(current_dir)
    sys.path.insert(0, parent_dir)
    from sierrakeygen import SierraKeygen

import logging
import logging.config
import logging.handlers
import colorama

itoa64 = bytearray(b"./0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz")


def _crypt_to64(s, v, n):
    out = bytearray()
    while --n >= 0:
        out.append(itoa64[v & 0x3f])
        v >>= 6


class ColorFormatter(logging.Formatter):
    LOG_COLORS = {
        logging.ERROR: colorama.Fore.RED,
        logging.DEBUG: colorama.Fore.LIGHTMAGENTA_EX,
        logging.WARNING: colorama.Fore.YELLOW,
    }

    def format(self, record, *args, **kwargs):
        # if the corresponding logger has children, they may receive modified
        # record, so we want to keep it intact
        new_record = copy.copy(record)
        if new_record.levelno in self.LOG_COLORS:
            pad = ""
            if new_record.name != "root":
                print(new_record.name)
                pad = "[LIB]: "
            # we want levelname to be in different color, so let's modify it
            new_record.msg = "{pad}{color_begin}{msg}{color_end}".format(
                pad=pad,
                msg=new_record.msg,
                color_begin=self.LOG_COLORS[new_record.levelno],
                color_end=colorama.Style.RESET_ALL,
            )
        # now we can let standart formatting take care of the rest
        return super(ColorFormatter, self).format(new_record, *args, **kwargs)


class LogBase(type):
    debuglevel = logging.root.level

    def __init__(cls, *args):
        super().__init__(*args)
        logger_attribute_name = '_' + cls.__name__ + '__logger'
        logger_debuglevel_name = '_' + cls.__name__ + '__debuglevel'
        logger_name = '.'.join([c.__name__ for c in cls.mro()[-2::-1]])
        LOG_CONFIG = {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "root": {
                    "()": ColorFormatter,
                    "format": "%(name)s - %(message)s",
                }
            },
            "handlers": {
                "root": {
                    # "level": cls.__logger.level,
                    "formatter": "root",
                    "class": "logging.StreamHandler",
                    "stream": "ext://sys.stdout",
                }
            },
            "loggers": {
                "": {
                    "handlers": ["root"],
                    # "level": cls.debuglevel,
                    "propagate": False
                }
            },
        }
        logging.config.dictConfig(LOG_CONFIG)
        logger = logging.getLogger(logger_name)

        setattr(cls, logger_attribute_name, logger)
        setattr(cls, logger_debuglevel_name, cls.debuglevel)


class vendor(Enum):
    sierra = 0x1199
    quectel = 0x2c7c
    zte = 0x19d2
    telit = 0x413c
    netgear = 0x0846


class deviceclass:
    vid = 0
    pid = 0

    def __init__(self, vid, pid):
        self.vid = vid
        self.pid = pid


class connection:
    def __init__(self, port=""):
        self.serial = None
        self.tn = None
        self.connected = False
        if port == "":
            port = self.detect(port)
        if port != "":
            try:
                self.serial = serial.Serial(port=port, baudrate=115200, bytesize=8, parity='N', stopbits=1, timeout=1)
                self.connected = self.serial.is_open
            except:
                self.connected = False

    def waitforusb(self, vid, pid):
        timeout = 0
        while timeout < 10:
            for device in self.detectusbdevices():
                if device.vid == vid:
                    if device.pid == pid:
                        return True
            time.sleep(1)
            timeout += 1
        return False

    def websend(self, url):
        headers = {'Referer': 'http://192.168.0.1/index.html', 'Accept-Charset': 'UTF-8'}
        r = requests.get(url, headers=headers)
        if b"FACTORY:ok" in r.content or b"success" in r.content:
            print(
                f"Detected a ZTE in web mode .... switching mode success (convert back by sending \"AT+ZCDRUN=F\" via AT port)")
            return self.waitforusb(vendor.zte.value, 0x0016)
        return False

    def getserialports(self):
        return [port for port in serial.tools.list_ports.comports()]

    def detectusbdevices(self):
        dev = usb.core.find(find_all=True)
        ids = [deviceclass(cfg.idVendor, cfg.idProduct) for cfg in dev]
        return ids

    def detect(self, port):
        atvendortable = {
            0x1199: ["Sierra Wireless", 3],
            0x2c7c: ["Quectel", 3],
            0x19d2: ["ZTE", 2],
            0x413c: ["Telit", 3],
            0x0846: ["Netgear", 2],
            0x04E8: ["Samsung", -1]
        }
        mode = "Unknown"
        try:
            for device in self.detectusbdevices():
                if device.vid == vendor.zte.value:
                    if device.pid == 0x0016:
                        print(f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in AT mode")
                        mode = "AT"
                        break
                    elif device.pid == 0x1403:
                        print(
                            f"Detected a {atvendortable[device.vid][0]} device with pid {hex(device.pid)} in Web mode")
                        mode = "Web"
                        self.ZTE_Web()
                        break
                elif device.vid == vendor.netgear.value:
                    try:
                        # vid 0846, netgear mr1100, mr5100
                        self.tn = Telnet("192.168.1.1", 5510)
                        self.connected = True
                    except:
                        self.connected = False
        except:
            print("No libusb driver found. Trying Telnet instead.")
            try:
                # vid 0846, netgear mr1100, mr5100
                self.tn = Telnet("192.168.1.1", 5510)
                self.connected = True
            except:
                self.connected = False
                print("Failed to connect to Telnet.")
                return
            pass
        if mode in ["AT", "Unknown"]:
            for port in self.getserialports():
                if port.vid in atvendortable:
                    portid = port.location[-1:]
                    if int(portid) == atvendortable[port.vid][1]:
                        print(f"Detected a {atvendortable[port.vid][0]} at interface at: " + port.device)
                        return port.device
        return ""

    def ZTE_Web(self):
        url = 'http://192.168.0.1/goform/goform_set_cmd_process?goformId=USB_MODE_SWITCH&usb_mode=6'
        if self.websend(url):
            print("Successfully enabled adb.")

    def readreply(self):
        info = []
        timeout = 0
        if self.serial is not None:
            while True:
                tmp = self.serial.readline().decode('utf-8').replace('\r', '').replace('\n', '')
                if "OK" in tmp:
                    info.append(tmp)
                    return info
                elif "ERROR" in tmp:
                    return -1
                if tmp != "":
                    info.append(tmp)
                else:
                    timeout += 1
                    if timeout == 4:
                        break
        return info

    def send(self, cmd):
        if self.tn is not None:
            self.tn.write(bytes(cmd + "\r", 'utf-8'))
            time.sleep(0.05)
            data = ""
            while True:
                tmp = self.tn.read_eager()
                if tmp != "":
                    data += tmp.strip()
                else:
                    break
            if "ERROR" in data:
                return -1
            return data.split("\r\n")
        elif self.serial is not None:
            self.serial.write(bytes(cmd + "\r", 'utf-8'))
            time.sleep(0.05)
            resp = self.readreply()
            return resp

    def close(self):
        if self.tn is not None:
            self.tn.close()
            self.connected = False
        if self.serial is not None:
            self.serial.close()
            self.connected = False

    def ati(self):
        data = {}
        info = self.send("ATI")
        if info != -1:
            for line in info:
                if "Revision" in line:
                    data["revision"] = line.split(":")[1].strip()
                if "Model" in line:
                    data["model"] = line.split(":")[1].strip()
                if "Quectel" in line:
                    data["vendor"] = "Quectel"
                if "Manufacturer" in line:
                    data["manufacturer"] = line.split(":")[1].strip()
                    if "Sierra Wireless" in data["manufacturer"]:
                        data["vendor"] = "Sierra Wireless"
                    elif "ZTE CORPORATION" in data["manufacturer"]:
                        data["vendor"] = "ZTE"
                    elif "SIMCOM INCORPORATED" in data["manufacturer"]:
                        data["vendor"] = "Simcom"
                    elif "Alcatel" in data["manufacturer"]:
                        data["vendor"] = "Alcatel"
                    elif "Netgear" in data["manufacturer"]:
                        data["vendor"] = "Netgear"
                    elif "SAMSUNG" in data["manufacturer"]:
                        data["vendor"] = "Samsung"
        info = self.send("AT+CGMI")
        if info != -1:
            for line in info:
                if "Quectel" in line:
                    data["vendor"] = "Quectel"
                    break
                elif "Fibucom" in line:
                    data["vendor"] = "Fibucom"
                    break
                elif "Netgear" in line:
                    data["vendor"] = "Netgear"
                    break
                elif "SAMSUNG" in line:
                    data["vendor"] = "Samsung"
                    break
        info = self.send("AT+CGMR")
        if info != -1:
            if len(info) > 1:
                data["model"] = info[1]
        return data


class adbtools(metaclass=LogBase):
    def sendcmd(self, tn, cmd):
        tn.write(bytes(cmd, 'utf-8') + b"\n")
        time.sleep(0.05)
        return tn.read_eager().strip().decode('utf-8')

    def qc_diag_auth(self, diag):
        if diag.connect():
            res = diag.send(b"\x4B\xA3\x06\x00")
            if res[0] == 0x4B:
                challenge = res[4:4 + 8]
                response = hashlib.md5(challenge).digest()
                res = diag.send(b"\x4B\xA3\x07\x00" + response)
                if res[0] == 0x4B:
                    if res[3] == 0x00:
                        print("Auth success")
            res = diag.send(b"\x41" + b"\x30\x30\x30\x30\x30\x30")
            if res[1] == 0x01:
                print("SPC success")
            sp = b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE"
            res = diag.send(b"\x46" + sp)
            if res[0] == 0x46 and res[1] == 0x01:
                print("SP success")
                return True
            else:
                res = diag.send(b"\x25" + sp)
                if res[0] == 0x46 and res[1] == 0x01:
                    print("SP success")
                    return True
        return False

    def meta(self, port, mode=b"METAMETA"):
        while True:
            cn = connection(port)
            if cn.connected:
                while True:
                    resp2 = cn.serial.read(8)
                    if len(resp2) > 0:
                        break
                cn.serial.write(mode)
                response = cn.serial.read(8)
                if len(response) == 0:
                    print("Read timeout while switching to META mode")
                elif response == b'ATEMATEM' or response == b'READYATE':
                    print("META Mode enabled")
                elif response == b'METAFORB':
                    print("META mode forbidden")
                else:
                    print("Invalid response: ", response)

    def run(self, port, enable):
        cn = connection(port)
        if cn.connected:
            info = cn.ati()
            res = False
            if "vendor" in info:
                if info["vendor"] == "Sierra Wireless" or info["vendor"] == "Netgear":
                    res = self.SierraWireless(cn, info, enable)
                elif info["vendor"] == "Quectel":
                    print("Sending at switch command")
                    res = self.Quectel(cn, enable)
                elif info["vendor"] == "ZTE":
                    print("Sending switch command via diag")
                    res = self.ZTE(cn, enable)
                elif info["vendor"] == "Simcom":
                    res = self.Simcom(cn, enable)
                elif info["vendor"] == "Fibocom":
                    res = self.Fibocom(cn, enable)
                elif info["vendor"] == "Alcatel":
                    res = self.Alcatel(enable)
                elif info["vendor"] == "Samsung":
                    res = self.Samsung(cn, enable)
            mode = "enabled" if enable else "disabled"
            if res:
                print("ADB successfully " + mode)
            else:
                print("ADB couldn't be " + mode)
            cn.close()
        else:
            print("No device detected")

    def SierraWireless(self, cn, info, enable):
        print("Sending at switch command")
        kg = SierraKeygen(cn=cn, devicegeneration=None)
        kg.detectdevicegeneration()
        if kg.openlock():
            if enable:
                if cn.send('AT!CUSTOM="ADBENABLE",1\r') != -1:
                    return True
                kg.openlock()
                if cn.send('AT!CUSTOM="TELNETENABLE",1\r') != -1:
                    time.sleep(5)
                    tn = Telnet("192.168.1.1", 23)
                    tn.write(b"adbd &\r\n")
                    info = tn.read_eager()
                    print(info)
                    return True
                if kg.openlock():
                    if info["vendor"] == "Netgear":
                        print("Enabling new port config")
                        if cn.send("AT!UDPID=68E2"):
                            print("Successfully enabled PID 68E2")
                            return True

            index = -1
            type = -1
            bitmask = -1
            resp = cn.send("AT!USBCOMP?")
            if resp != -1:
                print(resp)
                for val in resp:
                    if "Config Index" in val:
                        index = val[val.find("Config Index: ") + 14:]
                    elif "Config Type" in val:
                        type = val[val.find("Config Type: ") + 14:].replace(" (Generic)", "")
                    elif "Interface bitmask" in val:
                        bitmask = val[val.find("Interface bitmask: ") + 19:]
                        if " " in bitmask:
                            bitmask = "0x" + bitmask.split(" ")[0]
                if index != -1 and type != -1 and bitmask != 1:
                    index = int(index)
                    type = int(type)
                    bitmask = int(bitmask, 16)
                    # AT!USBCOMP=<Config Index>,<Config Type>,<Interface bitmask>
                    #  <Config Index>      - configuration index to which the composition applies, should be 1
                    #  <Config Type>       - 1:Generic, 2:USBIF-MBIM, 3:RNDIS
                    #                        config type 2/3 should only be used for specific Sierra PIDs: 68B1, 9068
                    #                        customized VID/PID should use config type 1
                    #  <Interface bitmask> - DIAG     - 0x00000001,
                    #                        ADB      - 0x00000002,
                    #                        NMEA     - 0x00000004,
                    #                        MODEM    - 0x00000008,
                    #                        RMNET0   - 0x00000100,
                    #                        RMNET1   - 0x00000400,
                    #                        RMNET2   - 0x00000800,
                    #                        MBIM     - 0x00001000,
                    #                        RNDIS    - 0x00004000,
                    #                        AUDIO    - 0x00010000,
                    #                        ECM      - 0x00080000,
                    #                        UBIST    - 0x00200000
                    #if enable:
                    cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0080010E
                    #else:
                    #    cmd = f"AT!USBCOMP={index},{type},%08X" % 0x0000010D
                    resp = cn.send(cmd)
                    if resp != -1:
                        resp = cn.send("AT!RESET")
                        if resp != -1:
                            return True
                    return False
                return True
            else:
                if cn.send('AT!CUSTOM="ADBENABLE",0\r') != -1:
                    return True
                kg.openlock()
                if cn.send('AT!CUSTOM="TELNETENABLE",0\r') != -1:
                    return True
        return False

    def Samsung(self, cn, enable):
        if enable:
            if cn.send("AT+USBMODEM=1"):
                return True
            elif cn.send("AT+SYSSCOPE=1,0,0"):
                return True
        else:
            if cn.send("AT+USBMODEM=0"):
                return True
            elif cn.send("AT+SYSSCOPE=1,0,0"):
                return True
        return False

    def Alcatel(self, enable):
        print("Send scsi switch command")
        print("Run \"sudo sg_raw /dev/sg0 16 f9 00 00 00 00 00 00 00 00 00 00 00 00 00 00 -v\" to enable adb")

    def Fibocom(self, cn, enable):
        print("Sending at switch command")
        if enable:
            # FibocomL718:
            if cn.send("AT+ADBDEBUG=1") != -1:
                return True
        else:
            if cn.send("AT+ADBDEBUG=0") != -1:
                return True
        return False

    def Simcom(self, cn, enable):
        print("Sending at switch command")
        if enable:
            # Simcom7600
            if cn.send("AT+CUSBADB=1,1") != -1:
                return True
        else:
            if cn.send("AT+CUSBADB=1,1") != -1:
                return True
        return False

    def ZTE(self, cn, enable):
        if enable:
            if cn.send("AT+ZMODE=1") != -1:
                return True
            else:
                interface = 0
                diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2, 0x0016, interface]])
                if self.qc_diag_auth(diag):
                    res = diag.send(b"\x4B\xFA\x0B\x00\x01")  # Enable adb serial
                    if res[0] != 0x13:
                        print("Success enabling adb serial")
                    res = diag.send(b"\x4B\x5D\x05\x00")  # Operate ADB
                    if res[0] != 0x13:
                        print("Success enabling adb")
                    return True
                diag.disconnect()
        else:
            if cn.send("AT+ZMODE=F") != -1:
                return True
            else:
                interface = 0
                diag = qcdiag(loglevel=self.__logger.level, portconfig=[[0x19d2, 0x0016, interface]])
                if self.qc_diag_auth(diag):
                    res = diag.send(b"\x4B\xFA\x0B\x00\x00")  # Enable adb serial
                    if res[0] != 0x13:
                        print("Success enabling adb serial")
                    res = diag.send(b"\x4B\x5D\x05\x00")  # Operate ADB
                    if res[0] != 0x13:
                        print("Success enabling adb")
                diag.disconnect()
                return True
        return False

    def Quectel(self, cn, enable: bool = True):
        sn = cn.send("AT+QADBKEY?\r")
        if sn != -1:
            if len(sn) > 1:
                sn = sn[1]
            cc = md5_crypt(salt="")
            code = cc.encrypt("SH_adb_quectel", salt=str(sn))
            code = code[12:28]
            cn.send("AT+QADBKEY=\"%s\"\r" % code)
        if enable:
            if cn.send("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,1,0\r") == -1:
                if cn.send("AT+QLINUXCMD=\"adbd\"") != -1:  # echo test > /dev/ttyGS0
                    return True
            else:
                return True
        else:
            if cn.send("AT+QCFG=\"usbcfg\",0x2C7C,0x125,1,1,1,1,1,0,0\r") != -1:
                return True
        return False


def main():
    version = "1.2"
    info = '\nModem Gimme-ADB ' + version + ' (c) B. Kerler 2020-2023\n-------------------------------------------\n'
    parser = argparse.ArgumentParser(formatter_class=argparse.RawDescriptionHelpFormatter, description=info)
    parser.add_argument(
        '-mode', help='Mode: enable or disable',
        default="enable")
    parser.add_argument(
        '-port', '-p',
        help='[Optional] use com port for at',
        default="")
    parser.add_argument(
        '-logfile', '-l',
        help='use logfile for debug log',
        default="")
    args = parser.parse_args()
    ad = adbtools()
    print(info)
    print("Supported modules: ZTE,Netgear,Sierra Wireless,Samsung,Alcatel,Quectel,Fibucom")
    if args.mode.lower() == "enable":
        enable = True
    else:
        enable = False
    ad.run(port=args.port, enable=enable)
    # ad.meta(port=args.port)


if __name__ == "__main__":
    main()
